package com.shujia.flink.sql

import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object Demo1WordCOunt {
  def main(args: Array[String]): Unit = {
    //环境设置对象
    val settings: EnvironmentSettings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      //.inBatchMode()
      .build()

    /**
     * flink sql环境
     *
     */
    val table: TableEnvironment = TableEnvironment.create(settings)


    /**
     * 1、创建kafka source表
     *
     */

    table.executeSql(
      """
        |CREATE TABLE words (
        |  `word` STRING
        |) WITH (
        |  'connector' = 'kafka',
        |  'topic' = 'words',
        |  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
        |  'properties.group.id' = 'asdasd',
        |  'scan.startup.mode' = 'earliest-offset',
        |  'format' = 'csv'
        |)
        |""".stripMargin)

    /**
     * 创建一个print sink表
     *
     */
    table.executeSql(
      """
        |CREATE TABLE print_table (
        |word STRING,
        |c BIGINT
        |) WITH (
        | 'connector' = 'print'
        |)
        |""".stripMargin)


    /**
     * 统计单词的数量
     *
     */
    table.executeSql(
      """
        |insert into print_table
        |select word,count(1) as c
        |from words
        |group by word
        |
        |""".stripMargin)


  }

}
